package com.atguigu.flink.wordcount;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

//静态导入  把A类中静态的内容，导入到B类中，在B类中可以直接使用，好比是自己的一样，不用A.xxx
import static org.apache.flink.api.common.typeinfo.Types.*;


/**
 * Created by Smexy on 2023/11/7
 *
        在使用lamda表达式编写函数接口实现时，可能会出现类型被擦除，需要主动告知编译器类型的信息。

            如果是非泛型类型，例如 WordCount，可以使用 returns(Class<T> typeClass)
            如果是泛型类型，例如 Tuple2<String, Integer>，不能使用 returns(Class<T> typeClass)，
                可以使用以下
                            returns(TypeHint<T> typeHint)
                            returns(TypeInformation<T> typeInfo)

 */
public class Demo7_LamdaTuple
{
    public static void main(String[] args) throws Exception {

        //定义WEBUI绑定的端口
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",3333);

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //无界流
        DataStreamSource<String> dataStreamSource1 = environment.socketTextStream("hadoop102", 8888);

        dataStreamSource1.
            flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                String[] words = value.split(" ");
                Arrays.stream(words)
                      .forEach(
                          word -> out.collect(Tuple2.of(word,1))
                      );
            })
            //.returns(new TypeHint<Tuple2<String, Integer>>() {})
            .returns(TUPLE(STRING,INT))  //推荐
            .keyBy(t -> t.f0)
            .sum(1)
            .print();

        environment.execute();



    }

    private static class MyWordCountFlatmapFunction2 implements FlatMapFunction<String, Tuple2<String,Integer>>
    {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] words = value.split(" ");
            Arrays.stream(words)
                  .forEach(
                      word -> out.collect(Tuple2.of(word,1))
                  );
        }
    }
}
